package sync

import (
	"encoding/json"
	"fmt"
	"github.com/nsqio/go-nsq"
)

type SubConfig struct {
	Topic   string
	Channel string
	Handler func(data []byte) error
}

func NewSubConfig(topic string, ch string) *SubConfig {
	return &SubConfig{
		Topic:   topic,
		Channel: ch,
	}
}
func (ms *ModelSync) Sub(conf *SubConfig) (err error) {
	configNsq := nsq.NewConfig()
	consumer, err := nsq.NewConsumer(conf.Topic, conf.Channel, configNsq)
	if err != nil {
		return err
	}
	consumer.SetLoggerLevel(nsq.LogLevelWarning)
	consumer.AddHandler(&MessageHandler{
		Handler: conf.Handler,
	})
	return consumer.ConnectToNSQLookupd(ms.NsqSubAddr)
}

func (ms *ModelSync) SubInitNeed(handle func(in *InitNeed) error) (err error) {
	conf := NewSubConfig(
		fmt.Sprintf("%s.%s.%s", SYNC_PREFIX, TOPIC_INIT_NEED, ms.ServiceName),
		"default",
	)
	conf.Handler = func(data []byte) error {
		in := new(InitNeed)
		e := json.Unmarshal(data, &in)
		if e != nil {
			return e
		}
		return handle(in)
	}
	return ms.Sub(conf)
}
func (ms *ModelSync) SubInitFeed(pubName string, usage string, handle func(data *ModelData) error) (err error) {
	conf := NewSubConfig(
		//主题
		fmt.Sprintf("%s.%s.%s_%s",
			SYNC_PREFIX, TOPIC_INIT_FEED,
			pubName, ms.ServiceName,
		),
		//频道
		fmt.Sprintf("%s-%s", ms.ServiceName, usage),
	)
	conf.Handler = func(data []byte) error {
		in := new(InitFeed)
		e := json.Unmarshal(data, &in)
		if e != nil {
			return e
		}
		return handle(in.Data)
	}
	return ms.Sub(conf)
}

func (ms *ModelSync) SubCrud(pubName string, modelName string, usage string, handler func(data *ModelData) error) (err error) {
	subConf := NewSubConfig(
		fmt.Sprintf("%s.%s.%s.%s", SYNC_PREFIX, TOPIC_CRUD, pubName, modelName),
		fmt.Sprintf("%s-%s", ms.ServiceName, usage),
	)
	subConf.Handler = func(data []byte) error {
		md := new(ModelData)
		e := json.Unmarshal(data, &md)
		if e != nil {
			return e
		}
		return handler(md)
	}
	return ms.Sub(subConf)
}
